{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ETL create new etl process\n",
    "> Create your custom ETL process to the ETL pipeline.\n",
    "\n",
    "when you want to create your own ETL process, it could be tricky.\n",
    "here is a simple example to show you where to start to create your own ETL process."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 1. Start from ETL Pipeline you wanna add your own ETL process\n",
    "> simple ETL pipeline to load huggingface dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark:\n",
      "  appname: ETL\n",
      "  driver:\n",
      "    memory: 16g\n",
      "etl:\n",
      "- name: data_ingestion___huggingface___hf2raw\n",
      "  args:\n",
      "    name_or_path:\n",
      "    - ai2_arc\n",
      "    - ARC-Challenge\n",
      "- name: data_load___huggingface___ufl2hf_obj\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from omegaconf import OmegaConf\n",
    "\n",
    "# load from dict\n",
    "ETL_config = OmegaConf.create({\n",
    "    'spark': {\n",
    "        'appname': 'ETL',\n",
    "        'driver': {'memory': '16g'},\n",
    "    },\n",
    "    'etl': [\n",
    "        {\n",
    "            'name': 'data_ingestion___huggingface___hf2raw',\n",
    "            'args': {'name_or_path': ['ai2_arc', 'ARC-Challenge']}\n",
    "        },\n",
    "        {\n",
    "            'name': 'data_load___huggingface___ufl2hf_obj'\n",
    "        }\n",
    "    ]\n",
    "})\n",
    "\n",
    "print(OmegaConf.to_yaml(ETL_config))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "23/11/14 19:02:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "23/11/14 19:02:29 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).\n",
      "Found cached dataset ai2_arc (/root/.cache/huggingface/datasets/ai2_arc/ARC-Challenge/1.0.0/1569c2591ea2683779581d9fb467203d9aa95543bb9b75dcfde5da92529fd7f6)\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "9233872e69a545e5a338bdc9b1154537",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "  0%|          | 0/3 [00:00<?, ?it/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "c5a31a78b3e9459fb28f8680b14491a8",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Creating parquet from Arrow format:   0%|          | 0/3 [00:00<?, ?ba/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Downloading and preparing dataset spark/-1076059055 to /root/.cache/huggingface/datasets/spark/-1076059055/0.0.0...\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Dataset spark downloaded and prepared to /root/.cache/huggingface/datasets/spark/-1076059055/0.0.0. Subsequent calls will reuse this data.\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "Dataset({\n",
       "    features: ['answerKey', 'choices', 'id', 'question'],\n",
       "    num_rows: 2590\n",
       "})"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "\n",
    "# raw -> hf_obj\n",
    "spark, dataset = etl_pipeline.run(ETL_config)\n",
    "dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 2. choose where you wanna add your own ETL process\n",
    "> remove or comment out the following ETL process from config!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark:\n",
      "  appname: ETL\n",
      "  driver:\n",
      "    memory: 16g\n",
      "etl:\n",
      "- name: data_ingestion___huggingface___hf2raw\n",
      "  args:\n",
      "    name_or_path:\n",
      "    - ai2_arc\n",
      "    - ARC-Challenge\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from omegaconf import OmegaConf\n",
    "\n",
    "# load from dict\n",
    "ETL_config = OmegaConf.create({\n",
    "    'spark': {\n",
    "        'appname': 'ETL',\n",
    "        'driver': {'memory': '16g'},\n",
    "    },\n",
    "    'etl': [\n",
    "        {\n",
    "            'name': 'data_ingestion___huggingface___hf2raw',\n",
    "            'args': {'name_or_path': ['ai2_arc', 'ARC-Challenge']}\n",
    "        },\n",
    "        \n",
    "        # TODO: you want to add your own ETL process from here\n",
    "\n",
    "        # TODO: if so, you need to add the following ETL process!\n",
    "        #       remove or comment out the following ETL process\n",
    "        # {\n",
    "        #     'name': 'data_load___huggingface___ufl2hf_obj'\n",
    "        # }\n",
    "    ]\n",
    "})\n",
    "\n",
    "print(OmegaConf.to_yaml(ETL_config))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "23/11/14 19:02:42 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Found cached dataset ai2_arc (/root/.cache/huggingface/datasets/ai2_arc/ARC-Challenge/1.0.0/1569c2591ea2683779581d9fb467203d9aa95543bb9b75dcfde5da92529fd7f6)\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "1835993218fb488a9cc02bcdef0f49b9",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "  0%|          | 0/3 [00:00<?, ?it/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "\n",
    "# raw -> spark, data[rdd, Dataframe]\n",
    "spark, data = etl_pipeline.run(ETL_config)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "\n",
       "            <div>\n",
       "                <p><b>SparkSession - in-memory</b></p>\n",
       "                \n",
       "        <div>\n",
       "            <p><b>SparkContext</b></p>\n",
       "\n",
       "            <p><a href=\"http://instance-3730:4040\">Spark UI</a></p>\n",
       "\n",
       "            <dl>\n",
       "              <dt>Version</dt>\n",
       "                <dd><code>v3.4.1</code></dd>\n",
       "              <dt>Master</dt>\n",
       "                <dd><code>local[10]</code></dd>\n",
       "              <dt>AppName</dt>\n",
       "                <dd><code>ETL</code></dd>\n",
       "            </dl>\n",
       "        </div>\n",
       "        \n",
       "            </div>\n",
       "        "
      ],
      "text/plain": [
       "<pyspark.sql.session.SparkSession at 0x7fe58d1ede40>"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "PythonRDD[13] at RDD at PythonRDD.scala:53"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 3. Check the current process so far!\n",
    "> use spark to check the current process so far!\n",
    "- `collect` is heavy so recommend to use `take` instead of `collect`!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[{'id': 'Mercury_7029645',\n",
       "  'question': 'Metal atoms will most likely form ions by the',\n",
       "  'choices': Row(text=['loss of electrons.', 'loss of protons.', 'gain of electrons.', 'gain of protons.'], label=['A', 'B', 'C', 'D']),\n",
       "  'answerKey': 'A'},\n",
       " {'id': 'Mercury_7216598',\n",
       "  'question': 'Which phrase does not describe asexual reproduction in organisms?',\n",
       "  'choices': Row(text=['requires two parents', 'little variation in offspring', 'only one type of cell involved', 'duplicates its genetic material'], label=['A', 'B', 'C', 'D']),\n",
       "  'answerKey': 'A'},\n",
       " {'id': 'MDSA_2008_5_40',\n",
       "  'question': 'A student is investigating changes in the states of matter. The student fills a graduated cylinder with 50 milliliters of packed snow. The graduated cylinder has a mass of 50 grams when empty and 95 grams when filled with the snow. The packed snow changes to liquid water when the snow is put in a warm room. Which statement best describes this process?',\n",
       "  'choices': Row(text=['Cooling causes the snow to melt.', 'Cooling causes the snow to freeze.', 'Heating causes the snow to freeze.', 'Heating causes the snow to melt.'], label=['A', 'B', 'C', 'D']),\n",
       "  'answerKey': 'D'}]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data.take(3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 4. Create your own ETL process\n",
    "> what do you want to do after all? \n",
    "\n",
    "Let's say you want to add `filter` process to the ETL pipeline.\n",
    "- you want to remove `choices` key from the dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "data = data.map(lambda x: {k: v for k, v in x.items() if k != 'choices'})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[{'id': 'Mercury_7029645',\n",
       "  'question': 'Metal atoms will most likely form ions by the',\n",
       "  'answerKey': 'A'},\n",
       " {'id': 'Mercury_7216598',\n",
       "  'question': 'Which phrase does not describe asexual reproduction in organisms?',\n",
       "  'answerKey': 'A'},\n",
       " {'id': 'MDSA_2008_5_40',\n",
       "  'question': 'A student is investigating changes in the states of matter. The student fills a graduated cylinder with 50 milliliters of packed snow. The graduated cylinder has a mass of 50 grams when empty and 95 grams when filled with the snow. The packed snow changes to liquid water when the snow is put in a warm room. Which statement best describes this process?',\n",
       "  'answerKey': 'D'}]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data.take(3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Hey! it's working ;)! `choices` key are removed from the dataset!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 5. Working? It's time to add to the ETL Registry\n",
    "> working great? it's time to move to how to add to the ETL Registry!\n",
    "[ETL_add_new_etl_process.ipynb](https://github.com/UpstageAI/dataverse/blob/main/guideline/etl/ETL_add_new_etl_process.ipynb)\n",
    "\n",
    "Check out the guideline from above notebook. and for preview here is the function template to add to the ETL Registry.\n",
    "\n",
    "```python\n",
    "# before\n",
    "data = data.map(lambda x: {k: v for k, v in x.items() if k != 'choices'})\n",
    "\n",
    "# after\n",
    "def your___custom___etl_process(spark, data, *args, **kwargs):\n",
    "    # add your custom process here\n",
    "    # here we are going to simply remove 'choices' key\n",
    "    data = data.map(lambda x: {k: v for k, v in x.items() if k != 'choices'})\n",
    "\n",
    "    return data\n",
    "```"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llm",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.11"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
